select模块提供了等待数据流的事件功能,使用这个模块需要建立wifi网络连接。下面的示例需要先执行以下连接语句(这些语句将会创建一个UDP服务器,我们可以使用手机创建热点并使用下面的代码连接,之后使用手机网路调试助手创建UDP客户端进行测试)。

import socket
import network
import time
import select

SSID = "dfrobotYanfa"    #修改为你WiFi的名称
PASSWORD = "hidfrobot"   #修改为你WiFi的密码
port = 10000   #端口号
wlan = None   #wlan
s = None   #套接字

wlan = network.WLAN(network.STA_IF)
wlan.active(True)   #激活网络
wlan.disconnect()
wlan.connect(SSID, PASSWORD)

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)   #创建套接字
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)   #设置套接字
ip = wlan.ifconfig()[0]
s.bind((ip, port))   #绑定套接字

  • select.POLLIN   = 1     — 有数据可读取
  • select.POLLOUT   = 4     — 可以写入数据
  • select.POLLERR   = 8     — 发生错误
  • select.POLLHUP   = 16     — 数据流结束/连接中断

函数

1. select.select(rlist, wlist, xlist[, timeout])

函数说明:监控对象何时可读或可写,一旦监控的对象状态改变,返回结果(阻塞线程)。这个函数是为了兼容,效率不高,推荐用 poll 函数。

rlist:等待读就绪的文件描述符数组
wlist:等待写就绪的文件描述符数组
xlist:等待异常的数组
timeout:等待时间(单位:秒)

示例:

def selectTest():
  global s
  rs, ws, es = select.select([s,], [], [])
  #程序会在此等待直到对象s可读
  print(rs)
  for i in rs:
    if i == s:
      print("s can read now")
      data,addr=s.recvfrom(1024)
      print('received:',data,'from',addr)

2. select.poll()

函数说明: 创建轮询实例。
示例:

>>>poller = select.poll()
>>>print(poller)
<poll>

2.1. select.poll.register(obj, flag)

函数说明:注册一个用以监控的对象,并设置被监控对象的监控标志位flag。

obj:被监控的对象
flag:被监控的标志
    select.POLLIN — 可读
     select.POLLHUP — 已挂断
    select.POLLERR — 出错
    select.POLLOUT — 可写

示例:

>>>READ_ONLY = select.POLLIN | select.POLLHUP | select.POLLERR
>>>READ_WRITE = select.POLLOUT | READ_ONLY
>>>poller.register(s, READ_WRITE)
>>>poller.unregister(s)

2.2. select.poll.unregister(obj)

函数说明:解除监控的对象的注册。

obj:注册过的对象

示例:

>>>READ_ONLY = select.POLLIN | select.POLLHUP | select.POLLERR
>>>READ_WRITE = select.POLLOUT | READ_ONLY
>>>poller.register(s, READ_WRITE)
>>>poller.unregister(s)

2.3. select.poll.modify(obj, flag)

函数说明:修改已注册的对象监控标志。

obj:已注册的被监控对象
flag:修改为的监控标志

示例:

>>>READ_ONLY = select.POLLIN | select.POLLHUP | select.POLLERR
>>>READ_WRITE = select.POLLOUT | READ_ONLY
>>>poller.register(s, READ_WRITE)
>>>poller.modify(s, READ_ONLY)

综合示例

   保存下面代码为.py文件并下载运行。

import socket
import network
import time
import select

SSID = "dfrobotYanfa"   #修改为你WiFi的名称
PASSWORD = "hidfrobot"   #修改为你WiFi的密码
port = 10000   #端口号
wlan = None   #WLAN
s = None   #套接字

wlan = network.WLAN(network.STA_IF)
wlan.active(True)   #激活网络
wlan.disconnect()   #断开上次连接
wlan.connect(SSID, PASSWORD)   #连接WiFi

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)   #创建套接字
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)   #设置套接字
ip = wlan.ifconfig()[0]
s.bind((ip, port))  #绑定套接字

def selectTest():
  global s
  rs, ws, es = select.select([s,], [], [])
  #程序会在此等待直到对象s可读
  print(rs)
  for i in rs:
    if i == s:
      print("s can read now")
      data,addr=s.recvfrom(1024)   #接收数据(1024字节)
      print('received:',data,'from',addr)

try:
  while True:
    selectTest()
except:
  if (s):
    s.close()
  wlan.disconnect()   #断开WiFi连接
  wlan.active(False)   #冻结网络

results matching ""

    No results matching ""